package fun.easycode.datastream;

import org.apache.rocketmq.client.exception.MQClientException;

/**
 * 数据增量处理管理器
 * @author xuzhen97
 */
public class DataQuantityManager {

    private final CanalAdapter canalAdapter;
    private final DataStreamProperties dataStreamProperties;

    public DataQuantityManager(CanalAdapter canalAdapter, DataStreamProperties dataStreamProperties) {
        this.canalAdapter = canalAdapter;
        this.dataStreamProperties = dataStreamProperties;
    }

    /**
     * 注册一个数据处理器
     * @param processor 数据处理器
     * @param <T> 数据类型
     * @throws MQClientException 启动异常
     */
    public <T> void register(IDataProcessor<T> processor) throws MQClientException {
        // 第一步，创建sink, 用于对接canal接受数据并输出到rocketmq
        DataEntryMQSink<T> sink = new DataEntryMQSink<>(processor, dataStreamProperties);
        // 第二步，创建consumer，用于消费rocketmq中的数据
        DataEntryConsumer<T> consumer = new DataEntryConsumer<>(processor, dataStreamProperties);
        // 第三步，启动sink和consumer
        sink.start();
        consumer.start();
        // 第四步，将sink注册到canal中
        canalAdapter.registerListener(sink);
    }

    /**
     * 注册一个数据转换器
     * @param transformer 数据处理器
     * @param <T> 数据类型
     * @param <R> 数据类型
     * @throws MQClientException 启动异常
     */
    public <T,R> void register(IDataTransformer<T,R> transformer) throws MQClientException {
        // 第一步，创建sink, 用于对接canal接受数据并输出到rocketmq
        DataTransformerEntryMQSink<T,R> sink = new DataTransformerEntryMQSink<>(transformer, dataStreamProperties);
        // 第二步，创建consumer，用于消费rocketmq中的数据
        DataTransformerEntryConsumer<T,R> consumer = new DataTransformerEntryConsumer<>(transformer, dataStreamProperties);
        // 第三步，启动sink和consumer
        sink.start();
        consumer.start();
        // 第四步，将sink注册到canal中
        canalAdapter.registerListener(sink);
    }
}
